#coding:utf8
import time

from pyspark.sql import *
import os
from pyspark.sql.types import *
import pandas as pd
import pyspark.sql.functions as f
from pyspark import *

if __name__ == '__main__':
    spark=SparkSession.builder.appName("test11_wordcount_demo")\
        .master("local[*]")\
        .config("spark.sql.shuffle.partitions",2) \
        .getOrCreate()
    """
    spark.sql.shuffle.partitions 参数指的是，在sql计算中，shuffle算子阶段默认的分区数是200个
    对于集群模式来说，200也不算多
    这个参数和spark rdd中设置并行度的参数是相互独立的
    """
    sc=spark.sparkContext
    localhost_path = "file://" + os.getcwd() + "/../data/input/sql/u.data"
    # localhost_path =os.getcwd()+"/../data/input/sql/u.data"
    schema = StructType().add("user_id", StringType(), True) \
        .add("movie_id", IntegerType(), True) \
        .add("rank", IntegerType(), True) \
        .add("ts", StringType(), True)
    df = spark.read.csv(localhost_path, schema, header=False, sep='\t', encoding="utf-8")
    #   write text写出，只能写出一个列的数据，需要将df转换成单列的df

    outPut_path2 = "file://" + os.getcwd() + "/../data/output_txt"
    outPut_path3 = "file://" + os.getcwd() + "/../data/output_csv"
    outPut_path4 = "file://" + os.getcwd() + "/../data/output_json"
    outPut_path5 = "file://" + os.getcwd() + "/../data/output_partquet"
    df_one=df.select(f.concat_ws("@","user_id","movie_id","rank","ts"))

    df_one.write.mode("overwrite").text(outPut_path2)
    df.write.mode("overwrite").csv(outPut_path3,sep=";",header=True)
    df.write.mode("overwrite").json(outPut_path4)
    df.write.mode("overwrite").parquet(outPut_path5)




